AggregateManyToManyAppender.java

package org.codefilarete.stalactite.engine.configurer.resolver.manytomany;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.codefilarete.reflection.AccessorChain;
import org.codefilarete.reflection.PropertyAccessor;
import org.codefilarete.stalactite.engine.configurer.IndexedAssociationRecordMapping;
import org.codefilarete.stalactite.engine.configurer.model.IntermediaryRelationJoin;
import org.codefilarete.stalactite.engine.configurer.model.ResolvedManyToManyRelation;
import org.codefilarete.stalactite.engine.configurer.resolver.AggregateResolver.AssemblyPoint;
import org.codefilarete.stalactite.engine.configurer.resolver.SkeletonAggregateResolver;
import org.codefilarete.stalactite.engine.listener.SelectListener;
import org.codefilarete.stalactite.engine.runtime.AssociationTable;
import org.codefilarete.stalactite.engine.runtime.ConfiguredRelationalPersister;
import org.codefilarete.stalactite.engine.runtime.IndexedAssociationRecord;
import org.codefilarete.stalactite.engine.runtime.IndexedAssociationTable;
import org.codefilarete.stalactite.engine.runtime.load.EntityInflater;
import org.codefilarete.stalactite.engine.runtime.load.EntityTreeInflater;
import org.codefilarete.stalactite.engine.runtime.load.JoinNode;
import org.codefilarete.stalactite.engine.runtime.onetomany.IndexedAssociationTableManyRelationDescriptor.InMemoryRelationHolder;
import org.codefilarete.stalactite.query.api.Fromable;
import org.codefilarete.stalactite.sql.ConnectionConfiguration;
import org.codefilarete.stalactite.sql.Dialect;
import org.codefilarete.stalactite.sql.ddl.structure.Column;
import org.codefilarete.stalactite.sql.ddl.structure.Table;
import org.codefilarete.stalactite.sql.result.ColumnedRow;
import org.codefilarete.tool.Nullable;
import org.codefilarete.tool.function.Hanger.Holder;

import static org.codefilarete.stalactite.engine.runtime.load.EntityJoinTree.JoinType.OUTER;
import static org.codefilarete.stalactite.engine.runtime.load.EntityJoinTree.ROOT_JOIN_NAME;

/**
 * Handles SELECT-path join-tree wiring for a {@link ResolvedManyToManyRelation}.
 * Write cascades are delegated to {@link ManyToManyResolver}.
 *
 * <p>Since many-to-many relations always use an intermediary association table, two join segments are always needed:
 * <ol>
 *   <li>A <em>passive join</em> from the source table to the association table (to filter rows).</li>
 *   <li>A <em>relation join</em> from the association table to the target table (to hydrate target beans).</li>
 * </ol>
 *
 * <p>When the association table carries an index column ({@link IndexedAssociationTable}), an
 * {@link InMemoryRelationHolder} is used instead of the plain relation fixer so that collection order is preserved.
 *
 * @author Guillaume Mary
 */
public class AggregateManyToManyAppender {
	
	private final ManyToManyResolver manyToManyResolver;
	
	public AggregateManyToManyAppender(SkeletonAggregateResolver skeletonAggregateResolver,
	                                   Dialect dialect,
	                                   ConnectionConfiguration connectionConfiguration) {
		this.manyToManyResolver = new ManyToManyResolver(skeletonAggregateResolver, dialect, connectionConfiguration);
	}
	
	/**
	 * Appends the given many-to-many relation to the aggregate persister by:
	 * <ol>
	 *   <li>Delegating write-cascade setup to {@link ManyToManyResolver}.</li>
	 *   <li>Adding the two necessary join segments to the root persister's join tree.</li>
	 *   <li>Forwarding SELECT lifecycle events from the source persister to the target persister.</li>
	 * </ol>
	 *
	 * @return an {@link AssemblyPoint} for the target entity, ready to be pushed onto the assembly queue
	 *         so that deeper relations are also resolved
	 */
	public <SRC, SRCID, TRGT, TRGTID, S extends Collection<TRGT>,
			LEFTTABLE extends Table<LEFTTABLE>, RIGHTTABLE extends Table<RIGHTTABLE>>
	AssemblyPoint<?, ?, ?, ?> append(ConfiguredRelationalPersister<SRC, SRCID> rootPersister,
	                                 ResolvedManyToManyRelation<SRC, TRGT, S, SRCID, TRGTID, LEFTTABLE, RIGHTTABLE> relation,
	                                 AssemblyPoint<SRC, SRCID, TRGT, LEFTTABLE> assemblyPawn) {
		
		Holder<AssemblyPoint> resultHolder = new Holder<>();
		manyToManyResolver.resolve(
				relation,
				assemblyPawn.getRelationOwnerPersister(),
				targetPersister -> {
					
					PropertyAccessor<SRC, S> accessor;
					if (assemblyPawn.getParentJoinPoint().equals(ROOT_JOIN_NAME)) {
						accessor = relation.getAccessor();
					} else {
						AccessorChain<SRC, S> shifter = new AccessorChain<>(assemblyPawn.getAccessor(), relation.getAccessor());
						shifter.setNullValueHandler(AccessorChain.RETURN_NULL);
						accessor = shifter;
					}
					
					@SuppressWarnings("unchecked")
					IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ?, SRCID, TRGTID> join =
							(IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ?, SRCID, TRGTID>) relation.getJoin();
					
					String manyJoinName;
					if (relation.isOrdered()) {
						manyJoinName = appendIndexedAssociation(rootPersister, targetPersister, relation, assemblyPawn, join, accessor);
					} else {
						manyJoinName = appendAssociation(rootPersister, targetPersister, relation, assemblyPawn, join, accessor);
					}
					
					// Forward SELECT lifecycle events from the source entity's persister down to the target persister
					SelectListener<TRGT, TRGTID> targetSelectListener = targetPersister.getPersisterListener().getSelectListener();
					assemblyPawn.getRelationOwnerPersister().addSelectListener(new SelectListener<SRC, SRCID>() {
						@Override
						public void beforeSelect(Iterable<SRCID> ids) {
							targetSelectListener.beforeSelect(Collections.emptyList());
						}
						
						@Override
						public void afterSelect(Set<? extends SRC> result) {
							Set<TRGT> targets = Nullable.nullable(result)
									.map(r -> r.stream()
											.flatMap(src -> Nullable.nullable(relation.getAccessor().get(src))
													.map(Collection::stream)
													.getOr(Stream.empty()))
											.collect(Collectors.toSet()))
									.getOr(Collections.emptySet());
							targetSelectListener.afterSelect(targets);
						}
						
						@Override
						public void onSelectError(Iterable<SRCID> ids, RuntimeException exception) {
							targetSelectListener.onSelectError(Collections.emptyList(), exception);
						}
					});
					
					// Preparing for next iteration
					// Note that we can't set the correct generics types to the AssemblyPoint instance
					// because we go a step further in the relation by shifting the types from SRC to TRGT
					resultHolder.set(new AssemblyPoint(relation.getTargetEntity(), targetPersister, manyJoinName, accessor));
				});
		
		return resultHolder.get();
	}
	
	/**
	 * Adds two join segments for the non-indexed association-table case:
	 * <ol>
	 *   <li>Passive join from source table to the association table.</li>
	 *   <li>Relation join from the association table to the target table, using the pre-built
	 *       {@link org.codefilarete.stalactite.sql.result.BeanRelationFixer} (which encodes optional
	 *       bidirectionality).</li>
	 * </ol>
	 */
	private <SRC, SRCID, TRGT, TRGTID, S extends Collection<TRGT>,
			LEFTTABLE extends Table<LEFTTABLE>,
			RIGHTTABLE extends Table<RIGHTTABLE>,
			ASSOCIATIONTABLE extends AssociationTable<ASSOCIATIONTABLE, LEFTTABLE, RIGHTTABLE, SRCID, TRGTID>>
	String appendAssociation(ConfiguredRelationalPersister<SRC, SRCID> rootPersister,
	                         ConfiguredRelationalPersister<TRGT, TRGTID> targetPersister,
	                         ResolvedManyToManyRelation<SRC, TRGT, S, SRCID, TRGTID, LEFTTABLE, RIGHTTABLE> relation,
	                         AssemblyPoint<SRC, SRCID, TRGT, LEFTTABLE> assemblyPawn,
	                         IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ?, SRCID, TRGTID> rawJoin,
	                         PropertyAccessor<SRC, S> accessor) {
		
		IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ASSOCIATIONTABLE, SRCID, TRGTID> join =
				(IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ASSOCIATIONTABLE, SRCID, TRGTID>) rawJoin;
		
		String associationTableJoinName = rootPersister.getEntityJoinTree().addPassiveJoin(
				assemblyPawn.getParentJoinPoint(),
				join.getLeftKey(),
				join.getLeftAssociationKey(),
				OUTER,
				Collections.emptySet());
		
		return rootPersister.getEntityJoinTree().addRelationJoin(
				associationTableJoinName,
				new EntityInflater.EntityMappingAdapter<>(targetPersister.<RIGHTTABLE>getMapping()),
				accessor,
				join.getRightAssociationKey(),
				join.getRightKey(),
				null,
				OUTER,
				relation.getRelationFixer(),   // pre-built fixer handles bidirectionality if configured
				Collections.emptySet(),
				null);
	}
	
	/**
	 * Adds two join segments for the indexed association-table case.
	 * An {@link InMemoryRelationHolder} is used as the relation fixer so that collection ordering is
	 * restored in-memory after the flat SQL result is accumulated.
	 */
	@SuppressWarnings("unchecked")
	private <SRC, SRCID, TRGT, TRGTID, S extends Collection<TRGT>,
			LEFTTABLE extends Table<LEFTTABLE>,
			RIGHTTABLE extends Table<RIGHTTABLE>,
			ASSOCIATIONTABLE extends IndexedAssociationTable<ASSOCIATIONTABLE, LEFTTABLE, RIGHTTABLE, SRCID, TRGTID>>
	String appendIndexedAssociation(ConfiguredRelationalPersister<SRC, SRCID> rootPersister,
	                                ConfiguredRelationalPersister<TRGT, TRGTID> targetPersister,
	                                ResolvedManyToManyRelation<SRC, TRGT, S, SRCID, TRGTID, LEFTTABLE, RIGHTTABLE> relation,
	                                AssemblyPoint<SRC, SRCID, TRGT, LEFTTABLE> assemblyPawn,
	                                IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ?, SRCID, TRGTID> rawJoin,
	                                PropertyAccessor<SRC, S> accessor) {
		
		IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ASSOCIATIONTABLE, SRCID, TRGTID> join =
				(IntermediaryRelationJoin<LEFTTABLE, RIGHTTABLE, ASSOCIATIONTABLE, SRCID, TRGTID>) rawJoin;
		Column<ASSOCIATIONTABLE, Integer> indexingColumn = join.getJoinTable().getIndexColumn();
		
		Holder<String> associationTableJoinNodeNameHolder = new Holder<>();
		Function<ColumnedRow, Object> duplicateIdentifierProvider = columnedRow -> {
			TRGTID identifier = targetPersister.getMapping().getIdMapping().getIdentifierAssembler().assemble(columnedRow);
			JoinNode<IndexedAssociationRecord, Fromable> joinNode =
					(JoinNode<IndexedAssociationRecord, Fromable>) rootPersister.getEntityJoinTree()
							.getJoin(associationTableJoinNodeNameHolder.get());
			ColumnedRow rowDecoder = EntityTreeInflater.currentContext().getDecoder(joinNode);
			Integer targetEntityIndex = rowDecoder.get(indexingColumn);
			return identifier + "-" + targetEntityIndex;
		};
		
		// Passive join: source table → association table (include all columns for index decoding)
		String associationTableJoinName = rootPersister.getEntityJoinTree().addPassiveJoin(
				assemblyPawn.getParentJoinPoint(),
				join.getLeftKey(),
				join.getLeftAssociationKey(),
				OUTER,
				join.getJoinTable().getColumns());
		associationTableJoinNodeNameHolder.set(associationTableJoinName);
		
		// The InMemoryRelationHolder buffers entities with their index and applies the sorted order after select
		// Note: getMappedByAccessor() is null in model.ManyToManyRelation; bidirectionality on the read path is a
		// known limitation for indexed M2M (consistent with indexed OneToMany with association table).
		InMemoryRelationHolder<SRC, SRCID, TRGT, S> inMemoryRelationFixer = new InMemoryRelationHolder<>(
				assemblyPawn.getRelationOwnerPersister()::getId,
				relation.getAccessor(),
				relation.getComponentFactory(),
				null);
		
		rootPersister.addSelectListener(new SelectListener<SRC, SRCID>() {
			@Override
			public void beforeSelect(Iterable<SRCID> ids) {
				inMemoryRelationFixer.init();
			}
			
			@Override
			public void afterSelect(Set<? extends SRC> result) {
				inMemoryRelationFixer.applySort(result);
				inMemoryRelationFixer.clear();
			}
			
			@Override
			public void onSelectError(Iterable<SRCID> ids, RuntimeException exception) {
				inMemoryRelationFixer.clear();
			}
		});
		
		// Relation join: association table → target table
		String manyJoinName = rootPersister.getEntityJoinTree().addRelationJoin(
				associationTableJoinName,
				new EntityInflater.EntityMappingAdapter<>(targetPersister.<RIGHTTABLE>getMapping()),
				accessor,
				join.getRightAssociationKey(),
				join.getRightKey(),
				null,
				OUTER,
				inMemoryRelationFixer,
				Collections.emptySet(),
				duplicateIdentifierProvider);
		
		// Attach a consumption listener: each time a target row is consumed, record its position from the association table row
		JoinNode<TRGT, Fromable> joinNode = (JoinNode<TRGT, Fromable>) rootPersister.getEntityJoinTree().getJoin(manyJoinName);
		JoinNode<?, Fromable> associationJoinNode = rootPersister.getEntityJoinTree().getJoin(associationTableJoinName);
		IndexedAssociationRecordMapping<ASSOCIATIONTABLE, LEFTTABLE, RIGHTTABLE, SRCID, TRGTID> associationRecordMapping =
				new IndexedAssociationRecordMapping<>(
						join.getJoinTable(),
						assemblyPawn.getRelationOwnerPersister().getMapping().getIdMapping().getIdentifierAssembler(),
						targetPersister.getMapping().getIdMapping().getIdentifierAssembler(),
						join.getJoinTable().getLeftIdentifierColumnMapping(),
						join.getJoinTable().getRightIdentifierColumnMapping());
		joinNode.setConsumptionListener((trgt, columnValueProvider) -> {
			ColumnedRow rowDecoder = EntityTreeInflater.currentContext().getDecoder(associationJoinNode);
			IndexedAssociationRecord associationRecord = associationRecordMapping.getRowTransformer().transform(rowDecoder);
			inMemoryRelationFixer.addIndex((SRCID) associationRecord.getLeft(), trgt, associationRecord.getIndex());
		});
		
		return manyJoinName;
	}
}